package chatrooms

import (
	"context"
	"dash/collectormsg"
	"dash/messages"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/go-kit/kit/log"
	"github.com/go-redis/redis/v8"
	"github.com/olivere/elastic/v7"
	"os"
	"time"
)

type ChatroomRepository interface {
	Get(ctx context.Context, id string) (*messages.Chatroom, error)
	LastMessageTimestamp(ctx context.Context, id string) (int64, error)
	AlarmMessageCount7(ctx context.Context, id string) (int64, error)
	MessageCount7(ctx context.Context, id string) (int64, error)
	MemberCount(ctx context.Context, id string) (int64, error)
	GetChatroomCollectors(ctx context.Context, chatroomId string, chatroom *messages.Chatroom) (*messages.Chatroom, error)
	UpdateComment(ctx context.Context, id, comment string) error
	UpdateTags(ctx context.Context, id string, tags []string) error
	UpdateCategory(ctx context.Context, id, categoryId string) error
	UpdateWatched(ctx context.Context, id string, watch bool) error
	GetByCategoryId(ctx context.Context, categoryId string) ([]string, error)
}

var (
	NoIdErr     error = errors.New("id不存在")
	UpdateErr   error = errors.New("更新失败")
	DataTypeErr error = errors.New("数据格式错误")
)

type defaultChatroomRepository struct {
	SqlConn                *sql.DB
	RedisConn              *redis.Client
	EsConn                 *elastic.Client
	logger                 log.Logger
	organizationRepository collectormsg.OrganizationRepository
}

func (r *defaultChatroomRepository) GetByCategoryId(ctx context.Context, categoryId string) ([]string, error) {
	chatroomIds := make([]string, 0)
	rows, err := r.SqlConn.Query(`SELECT id FROM chatroom WHERE categoryId=?;`, categoryId)
	if err != nil {
		if err == sql.ErrNoRows {
			return nil, errors.New("该分类下没有群")
		}
		_ = r.logger.Log("get chatroom by categoryId error:", err, "categoryId:", categoryId)
		return nil, errors.New("获取分类群失败")
	}
	defer func() {
		err := rows.Close()
		if err != nil {
			_ = r.logger.Log("sql error", err.Error())
		}
	}()
	for rows.Next() {
		var chatroomId string
		err = rows.Scan(&chatroomId)
		if err != nil {
			_ = r.logger.Log("get chatroom by categoryId scan error:", err, "categoryId:", categoryId)
			continue
		}
		chatroomIds = append(chatroomIds, chatroomId)
	}
	return chatroomIds, nil
}

func (r *defaultChatroomRepository) Get(ctx context.Context, id string) (*messages.Chatroom, error) {
	var chatroomRow chatroomRow
	err := r.SqlConn.QueryRow(`SELECT 
			a.id,
			a.name,
			a.dataSource,
			a.avatar,
			a.ownerId,
			a.usedName,
			a.captureTimestamp,
			a.categoryId,
			a.tags,
			a.comment,
			b.nickname
		FROM chatroom AS a 
		    INNER JOIN member AS b ON a.ownerId=b.id 
			LEFT JOIN category AS c ON a.categoryId=c.id
			WHERE a.id=?;`, id).Scan(
		&chatroomRow.Id,
		&chatroomRow.Name,
		&chatroomRow.DataSource,
		&chatroomRow.Avatar,
		&chatroomRow.OwnerId,
		&chatroomRow.UsedName,
		&chatroomRow.CaptureTimestamp,
		&chatroomRow.CategoryId,
		&chatroomRow.Tags,
		&chatroomRow.Comment,
		&chatroomRow.OwnerName,
	)
	if err != nil {
		if err != sql.ErrNoRows {
			_ = r.logger.Log("get chatroom error:", err, "chatroom id:", id)
		}
		return nil, NoIdErr
	}
	//To chatroom
	chatroom := chatroomRow.ToChatroom()
	//Get all members
	allMembers, err := r.GetChatroomMembers(id)
	if err != nil {
		_ = r.logger.Log("get chatroom members:", err)
	}
	for _, memberInfo := range allMembers {
		member := memberInfo.ToMember()
		member.MsgCount = r.memberMessageCount(ctx, id, memberInfo.Id.String)
		chatroom.Members = append(chatroom.Members, member)
	}
	return chatroom, nil
}

func NewChatroomRepository(conn *sql.DB,
	redisClient *redis.Client,
	esCLient *elastic.Client,
	organizationRepository collectormsg.OrganizationRepository,
	logger log.Logger) ChatroomRepository {
	return &defaultChatroomRepository{
		SqlConn:                conn,
		RedisConn:              redisClient,
		EsConn:                 esCLient,
		organizationRepository: organizationRepository,
		logger:                 logger,
	}
}

type ChatroomMemberRow struct {
	Id          sql.NullString
	Nickname    sql.NullString
	Alias       sql.NullString
	JoinMethod  sql.NullString
	InviterId   sql.NullString
	InviterName sql.NullString
}

type ColletorRow struct {
	CollectorId sql.NullString
	UserId      sql.NullString
	Name        sql.NullString
	OrganId     sql.NullString
}

type chatroomRow struct {
	Id               sql.NullString
	Name             sql.NullString
	OwnerId          sql.NullString
	OwnerName        sql.NullString
	Avatar           sql.NullString
	CaptureTimestamp sql.NullInt64
	DataSource       sql.NullString
	MessageCount     sql.NullInt64
	Tags             []byte
	CategoryId       sql.NullString
	Comment          sql.NullString
	UsedName         []byte
	CategoryName     sql.NullString
}

func (r *defaultChatroomRepository) UpdateComment(ctx context.Context, id, comment string) error {
	_, err := r.SqlConn.Exec(`UPDATE chatroom SET comment=? WHERE id=?;`, comment, id)
	if err != nil {
		_ = r.logger.Log("update chatroom comment", err)
		return UpdateErr
	}
	return nil
}

func (r *defaultChatroomRepository) UpdateTags(ctx context.Context, id string, tags []string) error {
	tagsJson, err := json.Marshal(tags)
	if err != nil {
		return DataTypeErr
	}
	_, err = r.SqlConn.Exec(`UPDATE chatroom SET tags=? WHERE id=?;`, tagsJson, id)
	if err != nil {
		_ = r.logger.Log("update chatroom tags", err)
		return UpdateErr
	}
	return nil
}

func (r *defaultChatroomRepository) UpdateCategory(ctx context.Context, id, categoryId string) error {
	_, err := r.SqlConn.Exec(`SELECT id from category WHERE id=?;`, categoryId)
	if err != nil {
		_ = r.logger.Log("update chatroom category", err)
		return NoIdErr
	}
	_, err = r.SqlConn.Exec(`UPDATE chatroom SET categoryId=? WHERE id=?;`, categoryId, id)
	if err != nil {
		_ = r.logger.Log("update chatroom category", err)
		return UpdateErr
	}
	return nil
}

func (r *defaultChatroomRepository) UpdateWatched(ctx context.Context, id string, watch bool) error {
	_, err := r.SqlConn.Exec(`UPDATE chatroom SET watched=? WHERE id=?;`, watch, id)
	if err != nil {
		_ = r.logger.Log("update chatroom watched", err)
		return UpdateErr
	}
	return nil
}

func (r *defaultChatroomRepository) MemberCount(ctx context.Context, id string) (int64, error) {
	var count sql.NullInt64
	err := r.SqlConn.QueryRow("SELECT COUNT(memberId) AS memberCount FROM chatroom_member WHERE chatroomId=?;", id).
		Scan(&count)
	if err != nil {
		if err == sql.ErrNoRows {
			return 0, errors.New("群id不存在")
		} else {
			_ = r.logger.Log("count chatroom member error:", err)
			return 0, errors.New("database error")
		}
	}
	return count.Int64, nil
}

func (r *defaultChatroomRepository) LastMessageTimestamp(ctx context.Context, id string) (int64, error) {
	return r.RedisConn.Get(ctx, fmt.Sprintf("chatroomLatestActiveTime:%s", id)).Int64()
}

func (r *defaultChatroomRepository) AlarmMessageCount7(ctx context.Context, id string) (int64, error) {
	results, err := r.EsConn.Search("alarm_message").
		Query(elastic.NewBoolQuery().
			Must(elastic.NewMatchQuery("chatroom.id", id)).
			Filter(elastic.NewRangeQuery("timestamp").
				Gte(time.Now().Unix()*1000 - 60*60*24*7*1000))).Do(ctx)
	if err != nil {
		return 0, err
	}
	count := results.Hits.TotalHits.Value
	return count, nil
}

func (r *defaultChatroomRepository) MessageCount7(ctx context.Context, id string) (int64, error) {
	key := "dailyChatroomMsgCount:%s:%s"
	today := time.Now()
	var total int64
	for i := 0; i > -7; i-- {
		year, month, day := today.AddDate(0, 0, i).Date()
		date := fmt.Sprintf("%d%02d%02d", year, int(month), day)
		dailyCount, err := r.RedisConn.Get(ctx, fmt.Sprintf(key, id, date)).Int64()
		if err != nil {
			return total, err
		}
		total += dailyCount
	}
	return total, nil
}

func (r *defaultChatroomRepository) GetChatroomMembers(chatroomId string) ([]*ChatroomMemberRow, error) {
	memberRows, err := r.SqlConn.Query(`
	SELECT a.memberId,a.alias,a.joinMethod,a.inviterId,b.nickname 
	FROM chatroom_member as a LEFT JOIN member as b 
	ON a.memberId=b.id 
	WHERE a.chatroomId=? AND a.quit=false;`, chatroomId)
	if err != nil {
		_ = r.logger.Log("get chatroom member error:", err)
		return nil, errors.New("获取群成员失败")
	}
	defer func() {
		err := memberRows.Close()
		if err != nil {
			_ = r.logger.Log("sql error", err.Error())
		}
	}()
	allMembers := make([]*ChatroomMemberRow, 0)
	for memberRows.Next() {
		var row ChatroomMemberRow
		err = memberRows.Scan(&row.Id, &row.Alias, &row.JoinMethod, &row.InviterId, &row.Nickname)
		if err != nil {
			_ = r.logger.Log("get chatroom scan error:", err)
			continue
		}
		if len(row.InviterId.String) > 0 {
			err := r.SqlConn.QueryRow(`SELECT nickname FROM member WHERE id=?;`, row.InviterId.String).Scan(&row.InviterName)
			if err != nil {
				if err == sql.ErrNoRows {
					_ = r.logger.Log("未查询到邀请人", row.InviterId)
				} else {
					_ = r.logger.Log("get invitor error:", err)
				}
			}
		}
		allMembers = append(allMembers, &row)
	}
	return allMembers, nil
}

func (r *defaultChatroomRepository) GetChatroomCollectors(ctx context.Context, chatroomId string, chatroom *messages.Chatroom) (*messages.Chatroom, error) {
	//采集号,采集人id,采集人姓名,该群的采集单位id
	collectorRows, err := r.SqlConn.Query(`SELECT 
	  a.collectorId,b.userId,c.name,c.organId FROM collector_chatroom as a 
	      LEFT JOIN collector_user as b ON a.collectorId=b.collectorId
	      LEFT JOIN user as c ON c.Id=b.userId
			WHERE a.chatroomId=?;`, chatroomId)
	if err != nil {
		_ = r.logger.Log("get chatroom collector error:", err)
		return nil, errors.New("获取采集信息失败")
	}
	defer func() {
		err := collectorRows.Close()
		if err != nil {
			_ = r.logger.Log("sql close error", err.Error())
		}
	}()
	allCollectors := make([]*ColletorRow, 0)
	for collectorRows.Next() {
		var row ColletorRow
		err = collectorRows.Scan(&row.CollectorId, &row.UserId, &row.Name, &row.OrganId)
		if err != nil {
			_ = r.logger.Log("get chatroom  collector scan error:", err)
			continue
		}
		allCollectors = append(allCollectors, &row)
	}
	collectorIds := map[string]*collectormsg.Collector{}
	organIds := make([]string, 0)
	for _, collectorRow := range allCollectors {
		collectorIds[collectorRow.CollectorId.String] = &collectormsg.Collector{
			Id: collectorRow.CollectorId.String,
		}
		organIds = append(organIds, collectorRow.OrganId.String)
	}
	//去重过后的collectorId
	for id, _ := range collectorIds {
		chatroom.CollectorId = append(chatroom.CollectorId, id)
	}
	//完整organization信息
	organizations, err := r.organizationRepository.GetOrganizationTree(ctx)
	if err != nil {
		_ = r.logger.Log("get chatroom organization error:", err)
	}
	//所有organization的map
	organizationsMap := map[string]*collectormsg.CollectorOrganization{}
	for _, organ := range organizations {
		organizationsMap[organ.Id] = organ
	}
	var parentId string
	organInfoMap := map[string]string{}
	for _, lastOrganid := range organIds {
		organ, exists := organizationsMap[lastOrganid]
		if !exists {
			continue
		}
		organInfoMap[lastOrganid] = organ.Name
		parentId = organ.ParentId
		for len(parentId) > 0 {
			parentOrgan, exists := organizationsMap[parentId]
			if !exists {
				continue
			}
			organInfoMap[lastOrganid] = parentOrgan.Name + organInfoMap[lastOrganid]
			parentId = parentOrgan.ParentId
		}
	}
	for _, organInfo := range organInfoMap {
		chatroom.CollectorOrgan = append(chatroom.CollectorOrgan, organInfo)
	}
	for _, collectorRow := range allCollectors {
		var userInfo string
		if collectorRow.OrganId.Valid {
			organInfo := organInfoMap[collectorRow.OrganId.String]
			userInfo = fmt.Sprintf("%s", organInfo)
		}
		if collectorRow.Name.Valid {
			userInfo = fmt.Sprintf(userInfo+"--%s", collectorRow.Name.String)
		}
		if collectorRow.UserId.Valid {
			userInfo = fmt.Sprintf(userInfo+"(%s)", collectorRow.UserId.String)
		}
		chatroom.User = append(chatroom.User, userInfo)
	}
	return chatroom, nil
}

//es search member message
func (r *defaultChatroomRepository) memberMessageCount(ctx context.Context, chatroomId, memberId string) int64 {
	results, err := r.EsConn.Search("message").
		Query(elastic.NewBoolQuery().
			Must(elastic.NewMatchQuery("chatroom.id", chatroomId)).
			Filter(elastic.NewMatchQuery("member.id", memberId))).Do(ctx)
	if err != nil {
		_ = r.logger.Log("es search chatroom member message count error:", err)
		return 0
	}
	return results.Hits.TotalHits.Value
}

func (row *ChatroomMemberRow) ToMember() *messages.Member {
	member := &messages.Member{
		Id:       row.Id.String,
		NickName: row.Nickname.String,
	}
	member.Aliases = append(member.Aliases, row.Alias.String)
	//进群方式
	member.JoinMethod = row.JoinMethod.String
	member.Inviter = &messages.Member{Id: row.InviterId.String, NickName: row.InviterName.String}
	return member
}

func (row *chatroomRow) ToChatroom() *messages.Chatroom {
	chatroom := &messages.Chatroom{
		Id:               row.Id.String,
		Name:             row.Name.String,
		Avatar:           row.Avatar.String,
		CaptureTimestamp: row.CaptureTimestamp.Int64,
		DataSource:       row.DataSource.String,
		MessageCount:     row.MessageCount.Int64,
		Comment:          row.Comment.String,
		Owner: messages.Member{
			Id:       row.OwnerId.String,
			NickName: row.OwnerName.String},
		Category: row.CategoryId.String,
	}
	tags := make([]string, 0, 5)
	if len(row.Tags) > 0 {
		err := json.Unmarshal(row.Tags, &tags)
		if err != nil {
			_ = log.NewJSONLogger(os.Stdout).Log("json unmarshal chatroom tags error:", err)
		}
	}
	usedNames := make([]string, 0, 3)
	if len(row.UsedName) > 0 {
		err := json.Unmarshal(row.UsedName, &usedNames)
		if err != nil {
			_ = log.NewJSONLogger(os.Stdout).Log("json unmarshal chatroom usedName error:", err)
		}
	}
	chatroom.Tags = tags
	chatroom.UsedName = usedNames
	return chatroom
}
